import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import mplfinance as mpf
import matplotlib.dates as mdates
import datetime as dt
import plotly.graph_objects as go
import plotly.express as px
import plotly.io as pio
from plotly.subplots import make_subplots
pio.renderers.default = "notebook"
pio.templates.default = "plotly_dark"
import gc
import warnings
warnings.filterwarnings("ignore")
plt.rcParams["figure.figsize"] = [12, 8]
def merge_two_stocks(
df1: pd.DataFrame,
df2: pd.DataFrame,
names=["df1", "df2"],
columns=None,
date_too=True,
) -> pd.DataFrame:
"""
Merge two stocks together on index (Assumes index is date)
Parameters
----------
df1 : pd.DataFrame
First dataframe
df2 : pd.DataFrame
Second dataframe
names : list, optional
Names of the two dataframes (Stock names, suffix will be decided by it), by default ["df1", "df2"]
columns : list, optional
Columns to merge, by default None
date_too : bool, optional
Whether to include the date column, by default True
Returns
-------
pd.DataFrame
Merged dataframe
"""
df1 = df1.copy()
df2 = df2.copy()
if columns:
df1 = df1[columns]
df2 = df2[columns]
df1.index = pd.Series(df1.index).apply(lambda x: x.strftime("%Y-%m-%d"))
df2.index = pd.Series(df2.index).apply(lambda x: x.strftime("%Y-%m-%d"))
df = df1.merge(
df2,
how="inner",
left_index=True,
right_index=True,
suffixes=("_" + names[0], "_" + names[1]),
)
if date_too:
df.index = pd.to_datetime(df.index)
df["Date"] = df.index
if len(columns) == 1 and date_too:
df.columns = [names[0], names[1], "Date"]
elif len(columns) == 1 and not date_too:
df.columns = [names[0], names[1]]
return df
def get_beta(df, freq="Y", names=["df1", "df2"]):
"""
Get beta of two stocks
Parameters
----------
df : pd.DataFrame
Dataframe of two stocks with date as index
freq : str, optional
Frequency of the data, by default "Y"
Returns
-------
float
Beta of the two stocks
"""
df = df.copy()
df = df.asfreq(freq).dropna()
df["Returns_" + names[0]] = df[names[0]].pct_change()
df["Returns_" + names[1]] = df[names[1]].pct_change()
df = df.dropna()
covariance = df["Returns_" + names[0]].cov(df["Returns_" + names[1]])
variance = df["Returns_" + names[1]].var()
beta = covariance / variance
return beta
def expected_return(rf, beta, Erm):
"""
Get expected return of a stock
Parameters
----------
rf : float
Risk free rate
beta : float
Beta of the stock
Erm : float
Expected return of the market
Returns
-------
float
Expected return of the stock
"""
return rf + beta * (Erm - rf)
apple = pd.read_csv("Data/AAPL.csv", parse_dates=["Date"], index_col="Date")
google = pd.read_csv("Data/GOOG.csv", parse_dates=["Date"], index_col="Date")
snp = pd.read_csv("Data/GSPC.csv", parse_dates=["Date"], index_col="Date")
gold = pd.read_csv("Data/gold.csv", parse_dates=["Date"], index_col="Date")
oil = pd.read_csv("Data/oil.csv", parse_dates=["Date"], index_col="Date")
treasury = pd.read_csv("Data/treasury.csv", parse_dates=["Date"], index_col="Date")
apple = apple[apple.index >= pd.to_datetime("2012-01-01").tz_localize(utc)]
google = google[google.index >= "2012-01-01"]
snp = snp[snp.index >= "2012-01-01"]
gold = gold[gold.index >= "2012-01-01"]
oil = oil[oil.index >= "2012-01-01"]
treasury = treasury[treasury.index >= "2012-01-01"]
It's a model of the optimal portfolio. It asserts that all investors will hold the optimal portfolio. But as not everyone holds the optimal portfolio, the model is only the half truth.
The model assumes that everyone is rational. It assumes that nobody has any risks that are inherent to them.

The basic equation of CAPM reads:
$$ E(r_i) = r_f + \beta_i (E(r_m) - r_f) $$where $r_i$ is the return of the stock, $r_f$ is the risk-free rate, $\beta_i$ is the beta of the stock, and $E(r_m)$ is the expected return of the market.
What is says is this: the expected return of a stock is the risk-free rate plus the beta of the stock times the expected return of the market minus the risk-free rate.
What is the risk-free rate? It is the return of a risk-free asset. For example, the return of a 10-year US Treasury bond.
Let's do some coding!
We'll deal with just the last 10 year. We'll use the S&P 500 as the market.
betas = {
"APPLE": None,
"GOOGLE": None,
"GOLD": None,
"OIL": None,
}
for stock in betas:
df = merge_two_stocks(
eval(stock.lower()), snp, columns=["Close"], names=[stock, "S&P 500"]
)
betas[stock] = get_beta(df, names=[stock, "S&P 500"], freq="M")
print(betas)
rf = treasury.iloc[-1]["Close"] / 12
rf
apple_beta = betas["APPLE"]
google_beta = betas["GOOGLE"]
gold_beta = betas["GOLD"]
oil_beta = betas["OIL"]
print(f"Apple Beta: {apple_beta}")
print(f"Google Beta: {google_beta}")
print(f"Gold Beta: {gold_beta}")
print(f"Oil Beta: {oil_beta}")
Erm = snp["Close"].pct_change().mean()
Erm = Erm * 365 / 12 * 100
print(f"Monthly Expected return of S&P 500: {Erm:.4f}%")
betas = [apple_beta, google_beta, gold_beta, oil_beta]
stocks = ["Apple", "Google", "Gold", "Oil"]
for beta, stock in zip(betas, stocks):
print(f"Beta of {stock}: {beta:.4f}")
print(f"Monthly Expected return of {stock}: {expected_return(rf, beta, Erm):.4f}%")


def portfolio_expted_value(weights, expected_returns):
"""
Get expected value of a portfolio
Parameters
----------
weights : list
Weights of the portfolio
expected_returns : list
Expected returns of the stocks in the portfolio
Returns
-------
float
Expected value of the portfolio
"""
return sum([w * r for w, r in zip(weights, expected_returns)])
def portfolio_variance(weights, cov_matrix):
"""
Get variance of a portfolio
Parameters
----------
weights : list
Weights of the portfolio
cov_matrix : np.array
Covariance matrix of the stocks in the portfolio
Returns
-------
float
Variance of the portfolio
"""
return np.dot(weights, np.dot(cov_matrix, weights))
def portfolio_std(weights, cov_matrix):
"""
Get standard deviation of a portfolio
Parameters
----------
weights : list
Weights of the portfolio
cov_matrix : np.array
Covariance matrix of the stocks in the portfolio
Returns
-------
float
Standard deviation of the portfolio
"""
return np.sqrt(portfolio_variance(weights, cov_matrix))
def portfolio_info(weights, expected_returns, cov_matrix):
"""
Get expected value, variance and standard deviation of a portfolio
Parameters
----------
weights : list
Weights of the portfolio
expected_returns : list
Expected returns of the stocks in the portfolio
cov_matrix : np.array
Covariance matrix of the stocks in the portfolio
Returns
-------
tuple
Expected value, variance and standard deviation of the portfolio
"""
return (
portfolio_expted_value(weights, expected_returns),
portfolio_variance(weights, cov_matrix),
portfolio_std(weights, cov_matrix),
)
We'll use two stocks, Apple and Google. We'll use the S&P 500 as the market. We'll be using the last 10 years of data. Weights will be 1/2 for Apple and 1/2 for Google.
apple_monthly = apple.asfreq("M").dropna()
google_monthly = google.asfreq("M").dropna()
gold_monthly = gold.asfreq("M").dropna()
oil_monthly = oil.asfreq("M").dropna()
apple_monthly["Returns"] = apple_monthly["Close"].pct_change()
google_monthly["Returns"] = google_monthly["Close"].pct_change()
gold_monthly["Returns"] = gold_monthly["Close"].pct_change()
oil_monthly["Returns"] = oil_monthly["Close"].pct_change()
apple_monthly = apple_monthly.dropna()
google_monthly = google_monthly.dropna()
gold_monthly = gold_monthly.dropna()
oil_monthly = oil_monthly.dropna()
apple_google = merge_two_stocks(
apple_monthly, google_monthly, columns=["Returns"], names=["APPLE", "GOOGLE"]
)
apple_google = apple_google.dropna()
apple_gold = merge_two_stocks(
apple_monthly, gold_monthly, columns=["Returns"], names=["APPLE", "GOLD"]
)
apple_gold = apple_gold.dropna()
apple_oil = merge_two_stocks(
apple_monthly, oil_monthly, columns=["Returns"], names=["APPLE", "OIL"]
)
apple_oil = apple_oil.dropna()
apple_google_cov = apple_google.cov()
apple_gold_cov = apple_gold.cov()
apple_oil_cov = apple_oil.cov()
print(f"Apple-Google Covariance: \n{apple_google_cov}")
print(f"Apple-Gold Covariance: \n{apple_gold_cov}")
print(f"Apple-Oil Covariance: \n{apple_oil_cov}")
expected, var, std = portfolio_info(
[0.5, 0.5],
[expected_return(rf, apple_beta, Erm), expected_return(rf, google_beta, Erm)],
apple_google_cov,
)
print(f"Expected return: {12*expected:.4f}%")
print(f"Variance: {var:.4f}")
print(f"Standard deviation: {100*std:.4f}")
The efficient portfolio of frontier expresses the standard deviation of the portfolio in terms of $r$ the expected return on the portfolio instead of $x_1$.

Here, we'll plot the efficient portfolio frontier for the last 10 years of data. We'll first use just two stocks, Apple and Google. We'll use the S&P 500 as the market.
weights_1 = np.linspace(0, 1.5, 300)
weights_2 = 1 - weights_1
stds = []
returns = []
for w1, w2 in zip(weights_1, weights_2):
expected, var, std = portfolio_info(
[w1, w2],
[expected_return(rf, apple_beta, Erm), expected_return(rf, google_beta, Erm)],
apple_google_cov,
)
stds.append(100 * std)
returns.append(12 * expected)
fig = px.line(
x=stds,
y=returns,
labels={"x": "Standard deviation", "y": "Expected return"},
title="Efficient frontier",
custom_data=[weights_1, weights_2],
)
fig.update_traces(
hovertemplate="Standard deviation: %{x:.4f}%<br>Expected return: %{y:.4f}%<br>Apple weight: %{customdata[0]:.4f}<br>Google weight: %{customdata[1]:.4f}"
)
weights_1 = np.linspace(0, 1.5, 300)
weights_2 = 1 - weights_1
stds = []
returns = []
for w1, w2 in zip(weights_1, weights_2):
expected, var, std = portfolio_info(
[w1, w2],
[expected_return(rf, apple_beta, Erm), expected_return(rf, gold_beta, Erm)],
apple_gold_cov,
)
stds.append(100 * std)
returns.append(12 * expected)
fig = px.line(
x=stds,
y=returns,
labels={"x": "Standard deviation", "y": "Expected return"},
title="Efficient frontier",
custom_data=[weights_1, weights_2],
)
fig.update_traces(
hovertemplate="Standard deviation: %{x:.4f}%<br>Expected return: %{y:.4f}%<br>Apple weight: %{customdata[0]:.4f}<br>gold weight: %{customdata[1]:.4f}"
)
weights_1 = np.linspace(0, 1.5, 300)
weights_2 = 1 - weights_1
stds = []
returns = []
for w1, w2 in zip(weights_1, weights_2):
expected, var, std = portfolio_info(
[w1, w2],
[expected_return(rf, apple_beta, Erm), expected_return(rf, oil_beta, Erm)],
apple_oil_cov,
)
stds.append(100 * std)
returns.append(12 * expected)
fig = px.line(
x=stds,
y=returns,
labels={"x": "Standard deviation", "y": "Expected return"},
title="Efficient frontier",
custom_data=[weights_1, weights_2],
)
fig.update_traces(
hovertemplate="Standard deviation: %{x:.4f}%<br>Expected return: %{y:.4f}%<br>Apple weight: %{customdata[0]:.4f}<br>oil weight: %{customdata[1]:.4f}"
)
Let's try three stocks. We'll use Apple, Google, and Gold. We'll use the S&P 500 as the market.
len(apple), len(google), len(gold), len(oil)
apple.index = pd.to_datetime(apple.index, utc=True)
df_merged = pd.merge_asof(
apple["Close"],
google["Close"],
left_index=True,
right_index=True,
direction="forward",
tolerance=pd.Timedelta("1D"),
suffixes=("_APPLE", "_GOOGLE"),
)
df_merged = pd.merge_asof(
df_merged,
gold["Close"],
left_index=True,
right_index=True,
direction="forward",
tolerance=pd.Timedelta("1D"),
suffixes=("", "_GOLD"),
)
# df_merged = pd.merge_asof(df_merged, oil["Close"], left_index=True, right_index=True, direction="forward", tolerance=pd.Timedelta("1D"), suffixes=("", "_OIL"))
df_merged = df_merged.dropna()
df_merged.columns = ["APPLE", "GOOGLE", "GOLD"]
df_merged.head()
df_merged_monthly = df_merged.asfreq("M", method="ffill")
df_merged_monthly["APPLE"] = df_merged_monthly["APPLE"].pct_change()
df_merged_monthly["GOOGLE"] = df_merged_monthly["GOOGLE"].pct_change()
df_merged_monthly["GOLD"] = df_merged_monthly["GOLD"].pct_change()
df_merged_monthly.dropna(inplace=True)
df_merged_monthly.head()
cov_matrix = df_merged_monthly.cov()
print(cov_matrix)
expected, var, std = portfolio_info(
[0.0, 1.0, 0.0],
[
expected_return(rf, apple_beta, Erm),
expected_return(rf, google_beta, Erm),
expected_return(rf, gold_beta, Erm),
],
cov_matrix,
)
print(f"Expected return: {12*expected:.4f}%")
print(f"Variance: {100*var:.4f}")
print(f"Standard deviation: {100*std:.4f}")
size = 300
weights_1 = np.linspace(0, 1, size)
weights_2 = 1 - weights_1
mask = np.arange(0, 100) * 0.01
weights_3 = [w for w in mask for _ in range(size // 100)]
weights_3 = np.array(weights_3).T
weights_3 = np.ravel(weights_3)
sum_ = weights_1 + weights_2 + weights_3
weights_1 = weights_1 / sum_
weights_2 = weights_2 / sum_
weights_3 = weights_3 / sum_
stds = []
returns = []
for w1, w2, w3 in zip(weights_1, weights_2, weights_3):
expected, var, std = portfolio_info(
[w1, w2, w3],
[
expected_return(rf, apple_beta, Erm),
expected_return(rf, google_beta, Erm),
expected_return(rf, gold_beta, Erm),
],
cov_matrix,
)
stds.append(100 * std)
returns.append(12 * expected)
portfolio_df = pd.DataFrame(
np.array([returns, stds, weights_1, weights_2, weights_3]).T,
columns=["Return", "STD", "Apple", "Google", "Gold"],
)
fig = px.line(
data_frame=portfolio_df,
x="STD",
y="Return",
labels={"x": "Standard deviation", "y": "Expected return"},
title="Efficient frontier",
custom_data=["Apple", "Google", "Gold"],
)
fig.update_traces(
hovertemplate="Standard deviation: %{x:.4f}%<br>Expected return: %{y:.4f}%<br>Apple weight: %{customdata[0]:.4f}<br>Google weight: %{customdata[1]:.4f}<br>Gold weight: %{customdata[2]:.4f}"
)
We'll assume that all the dataframes has index as date and have at least four columns- Open, High, Low, and Close. We'll also assume that the dataframes are sorted in ascending order of date. We'll write functions to:
apple = pd.read_csv("Data/AAPL.csv", parse_dates=["Date"], index_col="Date")
google = pd.read_csv("Data/GOOG.csv", parse_dates=["Date"], index_col="Date")
snp = pd.read_csv("Data/GSPC.csv", parse_dates=["Date"], index_col="Date")
gold = pd.read_csv("Data/gold.csv", parse_dates=["Date"], index_col="Date")
oil = pd.read_csv("Data/oil.csv", parse_dates=["Date"], index_col="Date")
treasury = pd.read_csv("Data/treasury.csv", parse_dates=["Date"], index_col="Date")
apple = apple[apple.index >= pd.to_datetime("2012-01-01").tz_localize(utc)]
google = google[google.index >= "2012-01-01"]
snp = snp[snp.index >= "2012-01-01"]
gold = gold[gold.index >= "2012-01-01"]
oil = oil[oil.index >= "2012-01-01"]
treasury = treasury[treasury.index >= "2012-01-01"]
def merge_dfs(dfs, names, drop=False):
"""
Merge dataframes on index
Parameters
----------
dfs : list
List of dataframes to merge
names : list
List of names of the dataframes
drop : bool, optional
Whether to drop rows with NaN values, by default False
Returns
-------
pd.DataFrame
Merged dataframe
"""
for df in dfs:
df = df.copy()
for df in dfs:
df.index = pd.to_datetime(df.index, utc=True)
df_merged = dfs[0]
for i in range(1, len(dfs)):
df_merged = pd.merge_asof(
df_merged,
dfs[i],
left_index=True,
right_index=True,
direction="forward",
tolerance=pd.Timedelta("1D"),
suffixes=("", f"_{names[i]}"),
)
columns = list(df_merged.columns)
to_modify = len(columns) // len(dfs)
for i in range(to_modify):
columns[i] = f"{columns[i]}_{names[0]}"
df_merged.columns = columns
if drop:
df_merged = df_merged.dropna()
return df_merged
mdf = merge_dfs(
[
apple["Close"],
google["Close"],
snp["Close"],
gold["Close"],
oil["Close"],
treasury["Close"],
],
["APPLE", "GOOGLE", "S&P", "GOLD", "OIL", "TREASURY"],
drop=True,
)
mdf.columns
def get_beta(df, freq="D", names=["df1", "df2"]):
"""
Get beta of two stocks
Parameters
----------
df : pd.DataFrame
Dataframe of two stocks with date as index
freq : str, optional
Frequency of the data, by default "D"
Returns
-------
float
Beta of the two stocks
"""
df = df.copy()
df = df.asfreq(freq).dropna()
df["Returns_" + names[0]] = df[names[0]].pct_change()
df["Returns_" + names[1]] = df[names[1]].pct_change()
df = df.dropna()
covariance = df["Returns_" + names[0]].cov(df["Returns_" + names[1]])
variance = df["Returns_" + names[1]].var()
beta = covariance / variance
return beta
apple_beta = get_beta(
mdf[["Close_APPLE", "Close_S&P"]], names=["Close_APPLE", "Close_S&P"], freq="M"
)
print(f"Apple beta: {apple_beta:.4f}")